home *** CD-ROM | disk | FTP | other *** search
/ Enter 2006 September / Enter 09 2006.iso / Internet / SpamExperts Home 1.1 / SpamExperts Home.exe / lib / spamexperts.modules / ZODB / DB.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2006-07-14  |  24.6 KB  |  762 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. '''Database objects
  5.  
  6. $Id: DB.py 41063 2006-01-02 01:29:13Z tim_one $'''
  7. import cPickle
  8. import cStringIO
  9. import sys
  10. import threading
  11. from time import time, ctime
  12. import logging
  13. from ZODB.broken import find_global
  14. from ZODB.utils import z64
  15. from ZODB.Connection import Connection
  16. from ZODB.serialize import referencesf
  17. from ZODB.utils import WeakSet
  18. from zope.interface import implements
  19. from ZODB.interfaces import IDatabase
  20. import transaction
  21. logger = logging.getLogger('ZODB.DB')
  22.  
  23. class _ConnectionPool(object):
  24.     '''Manage a pool of connections.
  25.  
  26.     CAUTION:  Methods should be called under the protection of a lock.
  27.     This class does no locking of its own.
  28.  
  29.     There\'s no limit on the number of connections this can keep track of,
  30.     but a warning is logged if there are more than pool_size active
  31.     connections, and a critical problem if more than twice pool_size.
  32.  
  33.     New connections are registered via push().  This will log a message if
  34.     "too many" connections are active.
  35.  
  36.     When a connection is explicitly closed, tell the pool via repush().
  37.     That adds the connection to a stack of connections available for
  38.     reuse, and throws away the oldest stack entries if the stack is too large.
  39.     pop() pops this stack.
  40.  
  41.     When a connection is obtained via pop(), the pool holds only a weak
  42.     reference to it thereafter.  It\'s not necessary to inform the pool
  43.     if the connection goes away.  A connection handed out by pop() counts
  44.     against pool_size only so long as it exists, and provided it isn\'t
  45.     repush()\'ed.  A weak reference is retained so that DB methods like
  46.     connectionDebugInfo() can still gather statistics.
  47.     '''
  48.     
  49.     def __init__(self, pool_size):
  50.         self.pool_size = pool_size
  51.         self.all = WeakSet()
  52.         self.available = []
  53.  
  54.     
  55.     def set_pool_size(self, pool_size):
  56.         self.pool_size = pool_size
  57.         self._reduce_size()
  58.  
  59.     
  60.     def push(self, c):
  61.         if not c not in self.all:
  62.             raise AssertionError
  63.         if not c not in self.available:
  64.             raise AssertionError
  65.         self._reduce_size(strictly_less = True)
  66.         self.all.add(c)
  67.         self.available.append(c)
  68.         n = len(self.all)
  69.         limit = self.pool_size
  70.         if n > limit:
  71.             reporter = logger.warn
  72.             if n > 2 * limit:
  73.                 reporter = logger.critical
  74.             
  75.             reporter('DB.open() has %s open connections with a pool_size of %s', n, limit)
  76.         
  77.  
  78.     
  79.     def repush(self, c):
  80.         if not c in self.all:
  81.             raise AssertionError
  82.         if not c not in self.available:
  83.             raise AssertionError
  84.         self._reduce_size(strictly_less = True)
  85.         self.available.append(c)
  86.  
  87.     
  88.     def _reduce_size(self, strictly_less = False):
  89.         target = self.pool_size - bool(strictly_less)
  90.         while len(self.available) > target:
  91.             c = self.available.pop(0)
  92.             self.all.remove(c)
  93.             c._resetCache()
  94.  
  95.     
  96.     def pop(self):
  97.         result = None
  98.         if self.available:
  99.             result = self.available.pop()
  100.             if not result in self.all:
  101.                 raise AssertionError
  102.         
  103.         return result
  104.  
  105.     
  106.     def map(self, f):
  107.         self.all.map(f)
  108.  
  109.  
  110.  
  111. class DB(object):
  112.     """The Object Database
  113.     -------------------
  114.  
  115.     The DB class coordinates the activities of multiple database
  116.     Connection instances.  Most of the work is done by the
  117.     Connections created via the open method.
  118.  
  119.     The DB instance manages a pool of connections.  If a connection is
  120.     closed, it is returned to the pool and its object cache is
  121.     preserved.  A subsequent call to open() will reuse the connection.
  122.     There is no hard limit on the pool size.  If more than `pool_size`
  123.     connections are opened, a warning is logged, and if more than twice
  124.     that many, a critical problem is logged.
  125.  
  126.     The class variable 'klass' is used by open() to create database
  127.     connections.  It is set to Connection, but a subclass could override
  128.     it to provide a different connection implementation.
  129.  
  130.     The database provides a few methods intended for application code
  131.     -- open, close, undo, and pack -- and a large collection of
  132.     methods for inspecting the database and its connections' caches.
  133.  
  134.     :Cvariables:
  135.       - `klass`: Class used by L{open} to create database connections
  136.  
  137.     :Groups:
  138.       - `User Methods`: __init__, open, close, undo, pack, classFactory
  139.       - `Inspection Methods`: getName, getSize, objectCount,
  140.         getActivityMonitor, setActivityMonitor
  141.       - `Connection Pool Methods`: getPoolSize, getVersionPoolSize,
  142.         removeVersionPool, setPoolSize, setVersionPoolSize
  143.       - `Transaction Methods`: invalidate
  144.       - `Other Methods`: lastTransaction, connectionDebugInfo
  145.       - `Version Methods`: modifiedInVersion, abortVersion, commitVersion,
  146.         versionEmpty
  147.       - `Cache Inspection Methods`: cacheDetail, cacheExtremeDetail,
  148.         cacheFullSweep, cacheLastGCTime, cacheMinimize, cacheSize,
  149.         cacheDetailSize, getCacheSize, getVersionCacheSize, setCacheSize,
  150.         setVersionCacheSize
  151.     """
  152.     implements(IDatabase)
  153.     klass = Connection
  154.     _activity_monitor = None
  155.     
  156.     def __init__(self, storage, pool_size = 7, cache_size = 400, version_pool_size = 3, version_cache_size = 100, database_name = 'unnamed', databases = None):
  157.         '''Create an object database.
  158.  
  159.         :Parameters:
  160.           - `storage`: the storage used by the database, e.g. FileStorage
  161.           - `pool_size`: expected maximum number of open connections
  162.           - `cache_size`: target size of Connection object cache
  163.           - `version_pool_size`: expected maximum number of connections (per
  164.             version)
  165.           - `version_cache_size`: target size of Connection object cache for
  166.             version connections
  167.         '''
  168.         x = threading.RLock()
  169.         self._a = x.acquire
  170.         self._r = x.release
  171.         self._pools = { }
  172.         self._pool_size = pool_size
  173.         self._cache_size = cache_size
  174.         self._version_pool_size = version_pool_size
  175.         self._version_cache_size = version_cache_size
  176.         self._miv_cache = { }
  177.         self._storage = storage
  178.         storage.registerDB(self, None)
  179.         if not hasattr(storage, 'tpc_vote'):
  180.             
  181.             storage.tpc_vote = lambda *args: pass
  182.         
  183.         
  184.         try:
  185.             storage.load(z64, '')
  186.         except KeyError:
  187.             PersistentMapping = PersistentMapping
  188.             import persistent.mapping
  189.             root = PersistentMapping()
  190.             file = cStringIO.StringIO()
  191.             p = cPickle.Pickler(file, 1)
  192.             p.dump((root.__class__, None))
  193.             p.dump(root.__getstate__())
  194.             t = transaction.Transaction()
  195.             t.description = 'initial database creation'
  196.             storage.tpc_begin(t)
  197.             storage.store(z64, None, file.getvalue(), '', t)
  198.             storage.tpc_vote(t)
  199.             storage.tpc_finish(t)
  200.  
  201.         if databases is None:
  202.             databases = { }
  203.         
  204.         self.databases = databases
  205.         self.database_name = database_name
  206.         if database_name in databases:
  207.             raise ValueError('database_name %r already in databases' % database_name)
  208.         
  209.         databases[database_name] = self
  210.         for m in [
  211.             'history',
  212.             'supportsUndo',
  213.             'supportsVersions',
  214.             'undoLog',
  215.             'versionEmpty',
  216.             'versions']:
  217.             setattr(self, m, getattr(storage, m))
  218.         
  219.         if hasattr(storage, 'undoInfo'):
  220.             self.undoInfo = storage.undoInfo
  221.         
  222.  
  223.     
  224.     def _returnToPool(self, connection):
  225.         '''Return a connection to the pool.
  226.  
  227.         connection._db must be self on entry.
  228.         '''
  229.         self._a()
  230.         
  231.         try:
  232.             if not connection._db is self:
  233.                 raise AssertionError
  234.             connection._opened = None
  235.             am = self._activity_monitor
  236.             if am is not None:
  237.                 am.closedConnection(connection)
  238.             
  239.             version = connection._version
  240.             
  241.             try:
  242.                 pool = self._pools[version]
  243.             except KeyError:
  244.                 connection.__dict__.clear()
  245.                 return None
  246.  
  247.             pool.repush(connection)
  248.         finally:
  249.             self._r()
  250.  
  251.  
  252.     
  253.     def _connectionMap(self, f):
  254.         self._a()
  255.         
  256.         try:
  257.             for pool in self._pools.values():
  258.                 pool.map(f)
  259.         finally:
  260.             self._r()
  261.  
  262.  
  263.     
  264.     def abortVersion(self, version, txn = None):
  265.         if txn is None:
  266.             txn = transaction.get()
  267.         
  268.         txn.register(AbortVersion(self, version))
  269.  
  270.     
  271.     def cacheDetail(self):
  272.         '''Return information on objects in the various caches
  273.  
  274.         Organized by class.
  275.         '''
  276.         detail = { }
  277.         
  278.         def f(con, detail = detail):
  279.             for oid, ob in con._cache.items():
  280.                 module = getattr(ob.__class__, '__module__', '')
  281.                 if not module or '%s.' % module:
  282.                     pass
  283.                 module = ''
  284.                 c = '%s%s' % (module, ob.__class__.__name__)
  285.                 if c in detail:
  286.                     detail[c] += 1
  287.                     continue
  288.                 detail[c] = 1
  289.             
  290.  
  291.         self._connectionMap(f)
  292.         detail = detail.items()
  293.         detail.sort()
  294.         return detail
  295.  
  296.     
  297.     def cacheExtremeDetail(self):
  298.         detail = []
  299.         conn_no = [
  300.             0]
  301.         
  302.         def f(con, detail = detail, rc = sys.getrefcount, conn_no = conn_no):
  303.             conn_no[0] += 1
  304.             cn = conn_no[0]
  305.             for oid, ob in con._cache_items():
  306.                 id = ''
  307.                 if hasattr(ob, '__dict__'):
  308.                     d = ob.__dict__
  309.                     if d.has_key('id'):
  310.                         id = d['id']
  311.                     elif d.has_key('__name__'):
  312.                         id = d['__name__']
  313.                     
  314.                 
  315.                 module = getattr(ob.__class__, '__module__', '')
  316.                 if not module or '%s.' % module:
  317.                     pass
  318.                 module = ''
  319.                 detail.append({
  320.                     'conn_no': cn,
  321.                     'oid': oid,
  322.                     'id': id,
  323.                     'klass': '%s%s' % (module, ob.__class__.__name__),
  324.                     'rc': rc(ob) - 3 - (ob._p_changed is not None),
  325.                     'state': ob._p_changed })
  326.             
  327.  
  328.         self._connectionMap(f)
  329.         return detail
  330.  
  331.     
  332.     def cacheFullSweep(self):
  333.         self._connectionMap((lambda c: c._cache.full_sweep()))
  334.  
  335.     
  336.     def cacheLastGCTime(self):
  337.         m = [
  338.             0]
  339.         
  340.         def f(con, m = m):
  341.             t = con._cache.cache_last_gc_time
  342.             if t > m[0]:
  343.                 m[0] = t
  344.             
  345.  
  346.         self._connectionMap(f)
  347.         return m[0]
  348.  
  349.     
  350.     def cacheMinimize(self):
  351.         self._connectionMap((lambda c: c._cache.minimize()))
  352.  
  353.     
  354.     def cacheSize(self):
  355.         m = [
  356.             0]
  357.         
  358.         def f(con, m = m):
  359.             m[0] += con._cache.cache_non_ghost_count
  360.  
  361.         self._connectionMap(f)
  362.         return m[0]
  363.  
  364.     
  365.     def cacheDetailSize(self):
  366.         m = []
  367.         
  368.         def f(con, m = m):
  369.             m.append({
  370.                 'connection': repr(con),
  371.                 'ngsize': con._cache.cache_non_ghost_count,
  372.                 'size': len(con._cache) })
  373.  
  374.         self._connectionMap(f)
  375.         m.sort()
  376.         return m
  377.  
  378.     
  379.     def close(self):
  380.         '''Close the database and its underlying storage.
  381.  
  382.         It is important to close the database, because the storage may
  383.         flush in-memory data structures to disk when it is closed.
  384.         Leaving the storage open with the process exits can cause the
  385.         next open to be slow.
  386.  
  387.         What effect does closing the database have on existing
  388.         connections?  Technically, they remain open, but their storage
  389.         is closed, so they stop behaving usefully.  Perhaps close()
  390.         should also close all the Connections.
  391.         '''
  392.         self._storage.close()
  393.  
  394.     
  395.     def commitVersion(self, source, destination = '', txn = None):
  396.         if txn is None:
  397.             txn = transaction.get()
  398.         
  399.         txn.register(CommitVersion(self, source, destination))
  400.  
  401.     
  402.     def getCacheSize(self):
  403.         return self._cache_size
  404.  
  405.     
  406.     def lastTransaction(self):
  407.         return self._storage.lastTransaction()
  408.  
  409.     
  410.     def getName(self):
  411.         return self._storage.getName()
  412.  
  413.     
  414.     def getPoolSize(self):
  415.         return self._pool_size
  416.  
  417.     
  418.     def getSize(self):
  419.         return self._storage.getSize()
  420.  
  421.     
  422.     def getVersionCacheSize(self):
  423.         return self._version_cache_size
  424.  
  425.     
  426.     def getVersionPoolSize(self):
  427.         return self._version_pool_size
  428.  
  429.     
  430.     def invalidate(self, tid, oids, connection = None, version = ''):
  431.         '''Invalidate references to a given oid.
  432.  
  433.         This is used to indicate that one of the connections has committed a
  434.         change to the object.  The connection commiting the change should be
  435.         passed in to prevent useless (but harmless) messages to the
  436.         connection.
  437.         '''
  438.         if connection is not None:
  439.             version = connection._version
  440.         
  441.         for oid in oids.keys():
  442.             h = hash(oid) % 131
  443.             o = self._miv_cache.get(h, None)
  444.             if o is not None and o[0] == oid:
  445.                 del self._miv_cache[h]
  446.                 continue
  447.         
  448.         
  449.         def inval(c):
  450.             if c is not connection:
  451.                 if not version or c._version == version:
  452.                     c.invalidate(tid, oids)
  453.                 
  454.  
  455.         self._connectionMap(inval)
  456.  
  457.     
  458.     def modifiedInVersion(self, oid):
  459.         h = hash(oid) % 131
  460.         cache = self._miv_cache
  461.         o = cache.get(h, None)
  462.         if o and o[0] == oid:
  463.             return o[1]
  464.         
  465.         v = self._storage.modifiedInVersion(oid)
  466.         cache[h] = (oid, v)
  467.         return v
  468.  
  469.     
  470.     def objectCount(self):
  471.         return len(self._storage)
  472.  
  473.     
  474.     def open(self, version = '', mvcc = True, transaction_manager = None, synch = True):
  475.         '''Return a database Connection for use by application code.
  476.  
  477.         The optional `version` argument can be used to specify that a
  478.         version connection is desired.
  479.  
  480.         Note that the connection pool is managed as a stack, to
  481.         increase the likelihood that the connection\'s stack will
  482.         include useful objects.
  483.  
  484.         :Parameters:
  485.           - `version`: the "version" that all changes will be made
  486.              in, defaults to no version.
  487.           - `mvcc`: boolean indicating whether MVCC is enabled
  488.           - `transaction_manager`: transaction manager to use.  None means
  489.              use the default transaction manager.
  490.           - `synch`: boolean indicating whether Connection should
  491.              register for afterCompletion() calls.
  492.         '''
  493.         self._a()
  494.         
  495.         try:
  496.             pool = self._pools.get(version)
  497.             if pool is None:
  498.                 if version:
  499.                     size = self._version_pool_size
  500.                 else:
  501.                     size = self._pool_size
  502.                 self._pools[version] = pool = _ConnectionPool(size)
  503.             
  504.             if not pool is not None:
  505.                 raise AssertionError
  506.             result = pool.pop()
  507.             if result is None:
  508.                 if version:
  509.                     size = self._version_cache_size
  510.                 else:
  511.                     size = self._cache_size
  512.                 c = self.klass(self, version, size)
  513.                 pool.push(c)
  514.                 result = pool.pop()
  515.             
  516.             if not result is not None:
  517.                 raise AssertionError
  518.             result.open(transaction_manager, mvcc, synch)
  519.             self._connectionMap((lambda c: c.cacheGC()))
  520.             return result
  521.         finally:
  522.             self._r()
  523.  
  524.  
  525.     
  526.     def removeVersionPool(self, version):
  527.         
  528.         try:
  529.             del self._pools[version]
  530.         except KeyError:
  531.             pass
  532.  
  533.  
  534.     
  535.     def connectionDebugInfo(self):
  536.         result = []
  537.         t = time()
  538.         
  539.         def get_info(c):
  540.             o = c._opened
  541.             d = c.getDebugInfo()
  542.             if d:
  543.                 if len(d) == 1:
  544.                     d = d[0]
  545.                 
  546.             else:
  547.                 d = ''
  548.             d = '%s (%s)' % (d, len(c._cache))
  549.             if o:
  550.                 pass
  551.             result.append({
  552.                 'opened': '%s (%.2fs)' % (ctime(o), t - o),
  553.                 'info': d,
  554.                 'version': version })
  555.  
  556.         for version, pool in self._pools.items():
  557.             pool.map(get_info)
  558.         
  559.         return result
  560.  
  561.     
  562.     def getActivityMonitor(self):
  563.         return self._activity_monitor
  564.  
  565.     
  566.     def pack(self, t = None, days = 0):
  567.         '''Pack the storage, deleting unused object revisions.
  568.  
  569.         A pack is always performed relative to a particular time, by
  570.         default the current time.  All object revisions that are not
  571.         reachable as of the pack time are deleted from the storage.
  572.  
  573.         The cost of this operation varies by storage, but it is
  574.         usually an expensive operation.
  575.  
  576.         There are two optional arguments that can be used to set the
  577.         pack time: t, pack time in seconds since the epcoh, and days,
  578.         the number of days to subtract from t or from the current
  579.         time if t is not specified.
  580.         '''
  581.         if t is None:
  582.             t = time()
  583.         
  584.         t -= days * 86400
  585.         
  586.         try:
  587.             self._storage.pack(t, referencesf)
  588.         except:
  589.             logger.error('packing', exc_info = True)
  590.             raise 
  591.  
  592.  
  593.     
  594.     def setActivityMonitor(self, am):
  595.         self._activity_monitor = am
  596.  
  597.     
  598.     def classFactory(self, connection, modulename, globalname):
  599.         return find_global(modulename, globalname)
  600.  
  601.     
  602.     def setCacheSize(self, size):
  603.         self._a()
  604.         
  605.         try:
  606.             self._cache_size = size
  607.             pool = self._pools.get('')
  608.             if pool is not None:
  609.                 
  610.                 def setsize(c):
  611.                     c._cache.cache_size = size
  612.  
  613.                 pool.map(setsize)
  614.         finally:
  615.             self._r()
  616.  
  617.  
  618.     
  619.     def setVersionCacheSize(self, size):
  620.         self._a()
  621.         
  622.         try:
  623.             self._version_cache_size = size
  624.             
  625.             def setsize(c):
  626.                 c._cache.cache_size = size
  627.  
  628.             for version, pool in self._pools.items():
  629.                 if version:
  630.                     pool.map(setsize)
  631.                     continue
  632.         finally:
  633.             self._r()
  634.  
  635.  
  636.     
  637.     def setPoolSize(self, size):
  638.         self._pool_size = size
  639.         self._reset_pool_sizes(size, for_versions = False)
  640.  
  641.     
  642.     def setVersionPoolSize(self, size):
  643.         self._version_pool_size = size
  644.         self._reset_pool_sizes(size, for_versions = True)
  645.  
  646.     
  647.     def _reset_pool_sizes(self, size, for_versions = False):
  648.         self._a()
  649.         
  650.         try:
  651.             for version, pool in self._pools.items():
  652.                 if (version != '') == for_versions:
  653.                     pool.set_pool_size(size)
  654.                     continue
  655.         finally:
  656.             self._r()
  657.  
  658.  
  659.     
  660.     def undo(self, id, txn = None):
  661.         '''Undo a transaction identified by id.
  662.  
  663.         A transaction can be undone if all of the objects involved in
  664.         the transaction were not modified subsequently, if any
  665.         modifications can be resolved by conflict resolution, or if
  666.         subsequent changes resulted in the same object state.
  667.  
  668.         The value of id should be generated by calling undoLog()
  669.         or undoInfo().  The value of id is not the same as a
  670.         transaction id used by other methods; it is unique to undo().
  671.  
  672.         :Parameters:
  673.           - `id`: a storage-specific transaction identifier
  674.           - `txn`: transaction context to use for undo().
  675.             By default, uses the current transaction.
  676.         '''
  677.         if txn is None:
  678.             txn = transaction.get()
  679.         
  680.         txn.register(TransactionalUndo(self, id))
  681.  
  682.     
  683.     def versionEmpty(self, version):
  684.         return self._storage.versionEmpty(version)
  685.  
  686.  
  687.  
  688. class ResourceManager(object):
  689.     '''Transaction participation for a version or undo resource.'''
  690.     
  691.     def __init__(self, db):
  692.         self._db = db
  693.         self.tpc_vote = self._db._storage.tpc_vote
  694.         self.tpc_finish = self._db._storage.tpc_finish
  695.         self.tpc_abort = self._db._storage.tpc_abort
  696.  
  697.     
  698.     def sortKey(self):
  699.         return '%s:%s' % (self._db._storage.sortKey(), id(self))
  700.  
  701.     
  702.     def tpc_begin(self, txn, sub = False):
  703.         if sub:
  704.             raise ValueError("doesn't support sub-transactions")
  705.         
  706.         self._db._storage.tpc_begin(txn)
  707.  
  708.     
  709.     def abort(self, obj, txn):
  710.         pass
  711.  
  712.     
  713.     def commit(self, obj, txn):
  714.         pass
  715.  
  716.  
  717.  
  718. class CommitVersion(ResourceManager):
  719.     
  720.     def __init__(self, db, version, dest = ''):
  721.         super(CommitVersion, self).__init__(db)
  722.         self._version = version
  723.         self._dest = dest
  724.  
  725.     
  726.     def commit(self, ob, t):
  727.         dest = self._dest
  728.         (tid, oids) = self._db._storage.commitVersion(self._version, self._dest, t)
  729.         oids = dict.fromkeys(oids, 1)
  730.         self._db.invalidate(tid, oids, version = self._dest)
  731.         if self._dest:
  732.             self._db.invalidate(tid, oids, version = self._version)
  733.         
  734.  
  735.  
  736.  
  737. class AbortVersion(ResourceManager):
  738.     
  739.     def __init__(self, db, version):
  740.         super(AbortVersion, self).__init__(db)
  741.         self._version = version
  742.  
  743.     
  744.     def commit(self, ob, t):
  745.         (tid, oids) = self._db._storage.abortVersion(self._version, t)
  746.         self._db.invalidate(tid, dict.fromkeys(oids, 1), version = self._version)
  747.  
  748.  
  749.  
  750. class TransactionalUndo(ResourceManager):
  751.     
  752.     def __init__(self, db, tid):
  753.         super(TransactionalUndo, self).__init__(db)
  754.         self._tid = tid
  755.  
  756.     
  757.     def commit(self, ob, t):
  758.         (tid, oids) = self._db._storage.undo(self._tid, t)
  759.         self._db.invalidate(tid, dict.fromkeys(oids, 1))
  760.  
  761.  
  762.